<?php

namespace mg;

use Closure;

/**
 * The ConcurrentStreamProcessor is a class responsible for fetching stream data from multiple resources
 * concurrently.
 *
 * @author Michiel Hakvoort <michiel@hakvoort.it>
 */
class ConcurrentStreamProcessor {


    protected $streams = null;

    protected $activeStreams = null;
    protected $queuedStreams = null;

    protected $concurrentStreamCount = null;

    protected $streamId = 0;

    protected $wrappers = null;

    protected $streamMappers = null;

    public function __construct($concurrentStreamCount = 10) {
        $this->concurrentStreamCount = $concurrentStreamCount;

        $this->streams = array();
        $this->activeStreams = array();
        $this->queuedStreams = array();
        $this->wrappers = array();
        $this->streamMappers = array();

        $this->registerStreamMapper(new ResourceStreamMapper());
        $this->registerStreamMapper(new FallbackStreamMapper());
    }

    public function queue($resource, Closure $callback) {
        $streamId = $this->streamId;

        $this->streams[$streamId] = array('resource' => $resource, 'callback' => $callback);
        $this->queuedStreams[] = $streamId;

        $this->streamId++;
    }

    public function registerStreamMapper(StreamMapper $streamMapper) {
        array_unshift($this->streamMappers, $streamMapper);
    }

    public function run() {
        if(count($this->queuedStreams) === 0) {
            return;
        }

        do {

            $_ = null;

            /*
             * Ensure at most concurrentStreamCount streams active at once
             */
            while(count($this->activeStreams) < $this->concurrentStreamCount && count($this->queuedStreams) > 0) {
                /*
                 * Fifo behaviour for new streams
                 */
                $streamId = array_shift($this->queuedStreams);

                $resource = $this->streams[$streamId]['resource'];

                /* @var $wrapper \mg\StreamWrapper */
                $wrapper = null;

                $iterator = new \ArrayIterator($this->streamMappers);
                $iterator->rewind();

                while($wrapper === null && $iterator->valid()) {
                    /* @var $nextMapper \mg\StreamMapper */
                    $mapper = $iterator->current();

                    if($mapper->canMap($resource)) {
                        $wrapper = $mapper->map($resource);
                    }

                    $iterator->next();
                }

                $wrapper->open();

                $this->activeStreams[$streamId] = $wrapper->getStream();

                // add the wrapper
                $this->wrappers[$streamId] = $wrapper;
            }

            $in = $this->activeStreams;

            stream_select($in, $_, $_, 10);

            if(count($in) > 0) {
                foreach($in as $resource) {
                    $streamId = array_search($resource, $this->activeStreams);

                    $wrapper = $this->wrappers[$streamId];

                    if(!$wrapper->read()) {
                        $wrapper->close();
                        $data = $wrapper->getData();

                        $callback = $this->streams[$streamId]['callback'];

                        unset($this->streams[$streamId]);
                        unset($this->activeStreams[$streamId]);
                        unset($this->buffers[$streamId]);

                        $callback($data);
                    }
                }
            } else {
                throw new Exception("Streams timed out");
            }

        } while(count($this->activeStreams) > 0 || count($this->queuedStreams) > 0);
    }
}

interface StreamMapper {

    public function canMap($resource);

    /**
     *
     * Enter description here ...
     * @param StreamWrapper $resource
     */
    public function map($resource);

}

class ResourceStreamMapper implements StreamMapper {

    public function canMap($resource) {
        return is_resource($resource);
    }

    public function map($resource) {
        return new ResourceStreamWrapper($resource);
    }
}

class FallbackStreamMapper implements StreamMapper {

    public function canMap($resource) {
        return is_string($resource);
    }

    public function map($resource) {
        return new FallbackStreamWrapper($resource);
    }
}

class HttpStreamMapper implements StreamMapper {

    public function canMap($resource) {
        if(!is_string($resource)) {
            return false;
        }

        $components = parse_url($resource);

        if($components === false || !isset($components['scheme']) || !isset($components['host'])) {
            return false;
        }

        return $components['scheme'] === 'http';
    }

    public function map($resource) {
        return new HttpStreamWrapper($resource);
    }
}

interface StreamWrapper {

    public function open();

    public function close();

    public function getStream();

    public function read();

    public function getData();
}

/**
 * Simple HttpStreamWrapper (for now, only supports simple GET request over unencrypted HTTP streams)
 *
 * @author Michiel Hakvoort <michiel@hakvoort.it>
 *
 */
class HttpStreamWrapper implements StreamWrapper {

    protected $location = null;
    protected $data = null;
    protected $resource = null;

    public function __construct($location) {
        $this->location = $location;
        $this->data = '';
    }

    public function open() {
        $components = parse_url($this->location);

        $port = isset($components['port']) ? $components['port'] : 80;
        $remoteSocket = "tcp://{$components['host']}:$port";

        $errorNumber = null;
        $errorString = null;

        $this->resource = stream_socket_client($remoteSocket, $errorNumber, $errorString, 5, STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);

        $get = isset($components['path']) ? $components['path'] : '/';

        if(isset($components['query'])) {
            $get .= '?' . $components['query'];
        }

        $request = "GET {$get} HTTP/1.0\r\nHost: {$components['host']}\r\n\r\n";

        fwrite($this->resource, $request);
    }

    public function close() {
        fclose($this->resource);
    }

    public function getStream() {
        return $this->resource;
    }

    public function read() {
        $data = fread($this->resource, 8192);

        $this->data .= $data;

        return strlen($data) > 0;
    }

    public function getData() {
        $headerStop = strpos($this->data, "\r\n\r\n");

        if($headerStop === false) {
            return '';
        }

        return substr($this->data, $headerStop + 4);
    }

}

class FallbackStreamWrapper implements StreamWrapper {

    protected $data;
    protected $resource;

    protected $location;

    public function __construct($location) {
        $this->location = $location;
        $this->data = '';
    }

    public function open() {
        $this->resource = fopen($this->location, 'r');
    }

    public function close() {
        fclose($this->resource);
    }

    public function getStream() {
        return $this->resource;
    }

    public function read() {
        $data = fread($this->resource, 8192);

        $this->data .= $data;

        return strlen($data) > 0;
    }

    public function getData() {
        return $this->data;
    }

}

class ResourceStreamWrapper implements StreamWrapper {

    protected $data;
    protected $resource;

    public function __construct($resource) {
        $this->resource = $resource;
        $this->data = '';
    }

    public function open() {

    }

    public function close() {
        fclose($this->resource);
    }

    public function getStream() {
        return $this->resource;
    }

    public function read() {
        $data = fread($this->resource, 8192);

        $this->data .= $data;

        return strlen($data) > 0;
    }

    public function getData() {
        return $this->data;
    }
}


